// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <glog/logging.h>
#include <stddef.h>

#include <memory>
#include <ostream>
#include <string>
#include <string_view>
#include <utility>
#include <vector>

#include "common/status.h"
#include "exprs/json_functions.h"
#include "simdjson.h"
#include "util/defer_op.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_variant.h"
#include "vec/columns/subcolumn_tree.h"
#include "vec/common/assert_cast.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nothing.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_string.h"
#include "vec/data_types/data_type_variant.h"
#include "vec/functions/function.h"
#include "vec/functions/function_helpers.h"
#include "vec/functions/simple_function_factory.h"
#include "vec/json/path_in_data.h"

namespace doris::vectorized {

class FunctionVariantElement : public IFunction {
public:
    static constexpr auto name = "element_at";
    static FunctionPtr create() { return std::make_shared<FunctionVariantElement>(); }

    // Get function name.
    String get_name() const override { return name; }

    bool use_default_implementation_for_nulls() const override { return false; }

    size_t get_number_of_arguments() const override { return 2; }

    ColumnNumbers get_arguments_that_are_always_constant() const override { return {1}; }

    DataTypes get_variadic_argument_types_impl() const override {
        return {std::make_shared<vectorized::DataTypeVariant>(),
                std::make_shared<DataTypeString>()};
    }

    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
        DCHECK_EQ(arguments[0]->get_primitive_type(), TYPE_VARIANT)
                << "First argument for function: " << name
                << " should be DataTypeVariant but it has type " << arguments[0]->get_name() << ".";
        DCHECK(is_string_type(arguments[1]->get_primitive_type()))
                << "Second argument for function: " << name << " should be String but it has type "
                << arguments[1]->get_name() << ".";
        auto arg_variant = remove_nullable(arguments[0]);
        const auto& data_type_object = assert_cast<const DataTypeVariant&>(*arg_variant);
        return make_nullable(
                std::make_shared<DataTypeVariant>(data_type_object.variant_max_subcolumns_count()));
    }

    // wrap variant column with nullable
    // 1. if variant is null root(empty or nothing as root), then nullable map is all null
    // 2. if variant is scalar variant, then use the root's nullable map
    // 3. if variant is hierarchical variant, then create a nullable map with all none null
    ColumnPtr wrap_variant_nullable(ColumnPtr col) const {
        const auto& var = assert_cast<const ColumnVariant&>(*col);
        if (var.is_null_root()) {
            return make_nullable(col, true);
        }
        if (var.is_scalar_variant() && var.get_root()->is_nullable()) {
            const auto* nullable = assert_cast<const ColumnNullable*>(var.get_root().get());
            return ColumnNullable::create(
                    col, nullable->get_null_map_column_ptr()->clone_resized(col->size()));
        }
        return make_nullable(col);
    }

    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
                        uint32_t result, size_t input_rows_count) const override {
        const auto* variant_col = check_and_get_column<ColumnVariant>(
                remove_nullable(block.get_by_position(arguments[0]).column).get());
        if (!variant_col) {
            return Status::RuntimeError(
                    fmt::format("unsupported types for function {}({}, {})", get_name(),
                                block.get_by_position(arguments[0]).type->get_name(),
                                block.get_by_position(arguments[1]).type->get_name()));
        }
        if (block.empty()) {
            block.replace_by_position(result, block.get_by_position(result).type->create_column());
            return Status::OK();
        }

        auto index_column = block.get_by_position(arguments[1]).column;
        ColumnPtr result_column;
        RETURN_IF_ERROR(get_element_column(*variant_col, index_column, &result_column));
        if (block.get_by_position(result).type->is_nullable()) {
            result_column = wrap_variant_nullable(result_column);
        }
        block.replace_by_position(result, result_column);
        return Status::OK();
    }

private:
    // Return sub-path by specified prefix.
    // For example, for prefix a.b:
    // a.b.c.d -> c.d, a.b.c -> c
    static std::optional<std::string_view> get_sub_path(const std::string_view& path,
                                                        const std::string_view& prefix) {
        if (path.size() <= prefix.size() || path[prefix.size()] != '.') {
            return std::nullopt;
        }
        return path.substr(prefix.size() + 1);
    }
    static Status get_element_column(const ColumnVariant& src, const ColumnPtr& index_column,
                                     ColumnPtr* result) {
        std::string field_name = index_column->get_data_at(0).to_string();
        if (src.empty()) {
            *result = ColumnVariant::create(src.max_subcolumns_count());
            // src subcolumns empty but src row count may not be 0
            (*result)->assume_mutable()->insert_many_defaults(src.size());
            // ColumnVariant should be finalized before parsing, finalize maybe modify original column structure
            (*result)->assume_mutable()->finalize();
            return Status::OK();
        }
        if (src.is_scalar_variant() && is_string_type(src.get_root_type()->get_primitive_type())) {
            // use parser to extract from root
            auto type = std::make_shared<DataTypeString>();
            MutableColumnPtr result_column = type->create_column();
            const ColumnString& docs =
                    *check_and_get_column<ColumnString>(remove_nullable(src.get_root()).get());
            simdjson::ondemand::parser parser;
            std::vector<JsonPath> parsed_paths;
            if (field_name.empty() || field_name[0] != '$') {
                field_name = "$." + field_name;
            }
            JsonFunctions::parse_json_paths(field_name, &parsed_paths);
            ColumnString* col_str = assert_cast<ColumnString*>(result_column.get());
            for (size_t i = 0; i < docs.size(); ++i) {
                if (!extract_from_document(parser, docs.get_data_at(i), parsed_paths, col_str)) {
                    VLOG_DEBUG << "failed to parse " << docs.get_data_at(i) << ", field "
                               << field_name;
                    result_column->insert_default();
                }
            }
            *result = ColumnVariant::create(src.max_subcolumns_count(), type,
                                            std::move(result_column));
            (*result)->assume_mutable()->finalize();
            return Status::OK();
        } else {
            auto mutable_src = src.clone_finalized();
            auto* mutable_ptr = assert_cast<ColumnVariant*>(mutable_src.get());
            PathInData path(field_name);
            ColumnVariant::Subcolumns subcolumns = mutable_ptr->get_subcolumns();
            const auto* node = subcolumns.find_exact(path);
            MutableColumnPtr result_col = ColumnVariant::create(src.max_subcolumns_count());
            ColumnVariant::Subcolumns new_subcolumns;

            auto extract_from_sparse_column = [&](auto& container) {
                ColumnVariant::Subcolumn root {0, true, true};
                // no root, no sparse column
                const auto& sparse_data_map =
                        assert_cast<const ColumnMap&>(*mutable_ptr->get_sparse_column());
                const auto& src_sparse_data_offsets = sparse_data_map.get_offsets();
                const auto& src_sparse_data_paths =
                        assert_cast<const ColumnString&>(sparse_data_map.get_keys());
                const auto& src_sparse_data_values =
                        assert_cast<const ColumnString&>(sparse_data_map.get_values());
                auto& sparse_data_offsets =
                        assert_cast<ColumnMap&>(*container->get_sparse_column()->assume_mutable())
                                .get_offsets();
                auto [sparse_data_paths, sparse_data_values] =
                        container->get_sparse_data_paths_and_values();
                StringRef prefix_ref(path.get_path());
                std::string_view path_prefix(prefix_ref.data, prefix_ref.size);
                for (size_t i = 0; i != src_sparse_data_offsets.size(); ++i) {
                    size_t start = src_sparse_data_offsets[ssize_t(i) - 1];
                    size_t end = src_sparse_data_offsets[ssize_t(i)];
                    size_t lower_bound_index =
                            vectorized::ColumnVariant::find_path_lower_bound_in_sparse_data(
                                    prefix_ref, src_sparse_data_paths, start, end);
                    for (; lower_bound_index != end; ++lower_bound_index) {
                        auto path_ref = src_sparse_data_paths.get_data_at(lower_bound_index);
                        std::string_view nested_path(path_ref.data, path_ref.size);
                        if (!nested_path.starts_with(path_prefix)) {
                            break;
                        }
                        // Don't include path that is equal to the prefix.
                        if (nested_path.size() != path_prefix.size()) {
                            auto sub_path_optional = get_sub_path(nested_path, path_prefix);
                            if (!sub_path_optional.has_value()) {
                                continue;
                            }
                            std::string_view sub_path = *sub_path_optional;
                            sparse_data_paths->insert_data(sub_path.data(), sub_path.size());
                            sparse_data_values->insert_from(src_sparse_data_values,
                                                            lower_bound_index);
                        } else {
                            // insert into root column, example:  access v['b'] and b is in sparse column
                            // data example:
                            // {"b" : 123}
                            // {"b" : {"c" : 456}}
                            // b maybe in sparse column, and b.c is in subolumn, put `b` into root column to distinguish
                            // from "" which is empty path and root
                            root.deserialize_from_sparse_column(&src_sparse_data_values,
                                                                lower_bound_index);
                        }
                    }
                    if (root.size() == sparse_data_offsets.size()) {
                        root.insert_default();
                    }
                    sparse_data_offsets.push_back(sparse_data_paths->size());
                }
                container->get_subcolumns().create_root(root);
                container->set_num_rows(mutable_ptr->size());
            };

            if (node != nullptr) {
                std::vector<decltype(node)> nodes;
                PathsInData paths;
                ColumnVariant::Subcolumns::get_leaves_of_node(node, nodes, paths);
                for (const auto* n : nodes) {
                    PathInData new_path = n->path.copy_pop_front();
                    VLOG_DEBUG << "add node " << new_path.get_path()
                               << ", data size: " << n->data.size()
                               << ", finalized size: " << n->data.get_finalized_column().size()
                               << ", common type: " << n->data.get_least_common_type()->get_name();
                    // if new_path is empty, indicate it's the root column, but adding a root will return false when calling add
                    if (!new_subcolumns.add(new_path, n->data)) {
                        VLOG_DEBUG << "failed to add node " << new_path.get_path();
                    }
                }

                // handle the root node
                if (new_subcolumns.empty() && !nodes.empty()) {
                    CHECK_EQ(nodes.size(), 1);
                    new_subcolumns.create_root(ColumnVariant::Subcolumn {
                            nodes[0]->data.get_finalized_column_ptr()->assume_mutable(),
                            nodes[0]->data.get_least_common_type(), true, true});
                    auto container = ColumnVariant::create(src.max_subcolumns_count(),
                                                           std::move(new_subcolumns));
                    result_col->insert_range_from(*container, 0, container->size());
                } else {
                    auto container = ColumnVariant::create(src.max_subcolumns_count(),
                                                           std::move(new_subcolumns));
                    container->clear_sparse_column();
                    extract_from_sparse_column(container);
                    result_col->insert_range_from(*container, 0, container->size());
                }
            } else {
                auto container = ColumnVariant::create(src.max_subcolumns_count(),
                                                       std::move(new_subcolumns));
                extract_from_sparse_column(container);
                result_col->insert_range_from(*container, 0, container->size());
            }
            *result = result_col->get_ptr();
            // ColumnVariant should be finalized before parsing, finalize maybe modify original column structure
            (*result)->assume_mutable()->finalize();
            VLOG_DEBUG << "dump new object "
                       << static_cast<const ColumnVariant*>(result_col.get())->debug_string()
                       << ", path " << path.get_path();
            return Status::OK();
        }
    }

    static Status extract_from_document(simdjson::ondemand::parser& parser, const StringRef& doc,
                                        const std::vector<JsonPath>& paths, ColumnString* column) {
        try {
            simdjson::padded_string json_str {doc.data, doc.size};
            simdjson::ondemand::document document = parser.iterate(json_str);
            simdjson::ondemand::object object = document.get_object();
            simdjson::ondemand::value value;
            RETURN_IF_ERROR(JsonFunctions::extract_from_object(object, paths, &value));
            _write_data_to_column(value, column);
        } catch (simdjson::simdjson_error& e) {
            VLOG_DEBUG << "simdjson parse exception: " << e.what();
            return Status::DataQualityError("simdjson parse exception {}", e.what());
        }
        return Status::OK();
    }

    static void _write_data_to_column(simdjson::ondemand::value& value, ColumnString* column) {
        switch (value.type()) {
        case simdjson::ondemand::json_type::null: {
            column->insert_default();
            break;
        }
        case simdjson::ondemand::json_type::boolean: {
            if (value.get_bool()) {
                column->insert_data("1", 1);
            } else {
                column->insert_data("0", 1);
            }
            break;
        }
        default: {
            auto value_str = simdjson::to_json_string(value).value();
            column->insert_data(value_str.data(), value_str.length());
        }
        }
    }
};

void register_function_variant_element(SimpleFunctionFactory& factory) {
    factory.register_function<FunctionVariantElement>();
}

} // namespace doris::vectorized
